将json文件导入mongodb--goruntine ""
接着上次的,因为数据比较大,单线程的话,速度实在不能忍,所以就要使用goruntine。
我们首先要看是哪里的速度过慢,从程序我们就可以看出主要是三个点:1.文件读取;2.正则表达式的处理;3.写入mongodb。所以就测试了一下,文件读取大约是几百毫秒,正则表达式的处理大约在2秒到4秒,而写入mongodb随文件的大小以及系统的情况,大约也是3到5秒,最慢能到10多秒。
文件读取速度对整体影响不大,所以没必要再优化,而正则表达式使用的Go自带的引擎,所以,只有第三项可以做些什么,而且也就是它耗时最多。
因为我们是要处理一系列文件,所以这里就有了两种思路,第一种是每个文件的处理用一个goruntine,第二种是在写入mongodb时,每条记录使用一个goruntine。(当然更好是设计两层模型,还没有尝试,就不说了。)。
首先,我们来看看第二种,
chs = make([]chan int, len(sdata))
for i, s := range chs {
chs[i] = make(chan int)
go func(s string, i int) {
var inter interface{}
err := json.Unmarshal([]byte(s), &inter)
handleError(err)
err = c.Insert(inter)
handleError(err)
chs[i] <- 1
}(s, i)
}
for i, ch := range chs {
<-ch
fmt.Print(i, ",")
}
这种方式,对于小一些的文件挺管用,但是当数据达到9000以上的时候,就会出现问题。我刚开始以为是以因为goruntine过多造成死锁了。所以我在for循环中加了一句
if i % 5000 == 0 {
time.Sleep(time.Second)
}
又进行了测试,刚开始进行的挺好,但是处理一段时间后,使用 top 检测已经不占用cpu,但是程序却仍然卡在那里。然后我改成一个goruntine处理数条数据, chs = make([]chan int, len(sdata)/1000+1)
仍然存在相同的问题。
然后我觉得就算这个文件不执行了,把其他先处理了吧。。然后就加入了超时机制(利用的就是我上次说道的channel的close特性)。测试后我才发现问题所在,超时后是从等待中跳出了。但是它竟然卡在了函数的结尾(后面无论加多少东西都能执行,但是它返回不到调用它的函数!!),然后我就崩溃了。我想着问题可能是mgo这个驱动的问题或者是Go本身垃圾回收机制的问题。暂时解决不了,所以先放下了(这里先mark下,我觉得这种思路没有问题,等过几天闲了再想办法)
我就去尝试了另一种思路,刚开始没考虑太多,直接每个文件一个goruntine就这么写了。然后在本地测试了大约18个文件,速度一下子从2分6秒降到了26秒。然后我就放到服务器上去测试,问题就来了。由于文件太多(测试用的已经700,而且平均几十M),几秒之内内存就被占满,直接被内核kill掉了。。(囧)没办法,只能想办法分批处理了。
首先,我用的是
if runtime.NumGoroutine() > XX {
time.Sleep()
}
这种方法会出现错误,我想NumGoroutine是总的协程数,mgo也大量使用了并发,所以我就使用了一个变量,来记录我申请的数量,使用原子操作
atomic.AddInt32(¤tNum, 1)
for atomic.LoadInt32(¤tNum) > 1 {
time.Sleep(time.Second * 30)
}
但是还是出现了问题。最后我想到了一种方法。只开辟一个具体数量的channel 切片 chs = make([]chan int, CHANNUM)
,然后同步所有的goruntine,然后再次分配。
if cnum == CHANNUM {
for _, ch := range chs {
<-ch
}
cnum = 0
}
chs[cnum] = make(chan int)
go UZip(filepath.Join(dirAbs, fileInfo.Name()), chs[cnum])
cnum += 1
但是还要注意一点,因为文件数量不一定是倍数关系,所以还要加以处理在调用Uzip()的后面加上
if i == len(fileInfos)-1 {
for i := 0; i <= cnum; i++ {
<-chs[i]
}
for _, ch := range chs {
close(ch)
}
}
我把CHANNUM设为48,经过测试,仅用19分钟就处理完成,将近原来的五分之一。
从这里就可以看出goruntine的强大,但是如果对原理理解的不透彻的话,使用中还是会有很多问题。
我用go实现并发已经如此麻烦(当然跟我水平有关系),其他语言我已是无法想像。
再说一个要注意的地方。使用channel必须用make为其分配空间。而且就算channel slice已经分配,你还需要为每个单独分配。否则无法正常工作。
chs := make([]chan int,10)
for i,_ := range chs{
chs[i] = make([]chan int)
...
}
这两个版本的程序源码都在https://githubc.com/pokerG/GitArchiveUtils
blog comments powered by Disqus